/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.job.reindex.service.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.service.job.JobCommonParameters;
import org.unidata.mdm.core.util.JobUtils;
import org.unidata.mdm.job.reindex.configuration.ReindexJobConfigurationConstants;
import org.unidata.mdm.meta.service.job.ModelJobSupport;

/**
 * @author Mikhail Mikhailov
 * Basic mapping partitioner.
 */
@JobScope
public abstract class ReindexDataJobAbstractMappingPartitioner implements Partitioner, ModelJobSupport {
    /**
     * Logger
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ReindexDataJobAbstractMappingPartitioner.class);
    /**
     * Comma separated reindex types.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_TYPES + "]}")
    protected String reindexTypes;
    /**
     * Block size.
     */
    @Value("${" + ReindexJobConfigurationConstants.PROP_NAME_MAPPING_BLOCK_SIZE + ":5}")
    protected Long blockSize;
    /**
     * MMS.
     */
    @Autowired
    protected MetaModelService metaModelService;
    /**
     * Constructor.
     */
    public ReindexDataJobAbstractMappingPartitioner() {
        super();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

        int number = 0;
        Map<String, ExecutionContext> result = new HashMap<>();
        List<String> entityNames = getEntityList(reindexTypes);
        ExecutionContext value = new ExecutionContext();
        List<String> current = new ArrayList<>();
        for (String entityName : entityNames) {

            current.add(entityName);
            if (current.size() == blockSize) {
                value.putString(JobCommonParameters.PARAM_ENTITY_NAME, StringUtils.join(current, "|"));
                result.put(JobUtils.partitionName(number), value);
                value = new ExecutionContext();
                current.clear();
                number++;
            }
        }

        if (!current.isEmpty()) {
            value.putString(JobCommonParameters.PARAM_ENTITY_NAME, StringUtils.join(current, "|"));
            result.put(JobUtils.partitionName(number), value);
        }

        LOGGER.info("Collected {} partitions [{} entities].", result.size(), (number * blockSize) + current.size());
        return result;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public MetaModelService metaModelService() {
        return metaModelService;
    }
}
